{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Online Feature Store\n",
    "\n",
    "This notebook includes examples of how to interact with the **Online** Feature Store in Hopsworks. The online feature store stores a subset of the feature data for real-time queries, suited for serving client-facing models. \n",
    "\n",
    "The online feature store contrasts to the **offline** feature store. The offline feature store contains historical data. The offline feature data is stored in Hive, a storage engine suited for large scale batch processing of data (such as *training* a machine learning model). On the other hand, the online feature store uses MySQL-Cluster database as the backend, a storage engine suited for smaller datasets that need to be queried in real-time."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>7</td><td>application_1569258228210_0011</td><td>pyspark</td><td>idle</td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8088/proxy/application_1569258228210_0011/\">Link</a></td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8042/node/containerlogs/container_e01_1569258228210_0011_01_000001/demo_featurestore_admin000__meb10000\">Link</a></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n"
     ]
    }
   ],
   "source": [
    "from hops import featurestore"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Get JDBC Connection to the Online Feature Store\n",
    "\n",
    "If your project's feature store has the online feature store enabled, there will be a storage connector for each user to access the online feature store. The storage connector can be accessed using the utility method `get_online_featurestore_connector()` in the Python SDK. The storage connector includes information about the JDBC connection, the password, port, host, and username etc."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "storage_connector = featurestore.get_online_featurestore_connector()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'jdbc:mysql://10.0.2.15:3306/demo_featurestore_admin000'"
     ]
    }
   ],
   "source": [
    "storage_connector.connection_string"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'demo_featurestore_admin000_meb1_onlinefeaturestore'"
     ]
    }
   ],
   "source": [
    "storage_connector.name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'JDBC connection to Hopsworks Project Online Feature Store NDB Database for user: demo_featurestore_admin000_meb1'"
     ]
    }
   ],
   "source": [
    "storage_connector.description"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'password=YTnQHFxNHwMlEpZboyJCgpZFSqyyKgQHXnUHJzSrVNhOslGRqKifTmzvRnhudipF,user=demo_featurestore_admin000_meb1'"
     ]
    }
   ],
   "source": [
    "storage_connector.arguments"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create a Feature Group with Online Feature Serving Enabled.\n",
    "\n",
    "When a feature group has online feature serving enabled, it means that its data will be stored in both Hive (for historical queries) and MySQL Cluster (for online queries). To enable online feature serving of a feature group simply set the flag `online=True` when creating a feature group, as illustrated below."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create Sample Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SQLContext\n",
    "from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, IntegerType, FloatType\n",
    "sqlContext = SQLContext(sc)\n",
    "schema = StructType([StructField(\"id\", IntegerType(), True),\n",
    "                     StructField(\"val_1\", FloatType(), True),\n",
    "                     StructField(\"val_2\", IntegerType(), True)\n",
    "                        ])\n",
    "sample_df = sqlContext.createDataFrame([(999, 41251.52, 1), (998, 1319.4, 8), (997, 21219.1, 2)], schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------+-----+\n",
      "| id|   val_1|val_2|\n",
      "+---+--------+-----+\n",
      "|999|41251.52|    1|\n",
      "|998|  1319.4|    8|\n",
      "|997| 21219.1|    2|\n",
      "+---+--------+-----+"
     ]
    }
   ],
   "source": [
    "sample_df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Save the Sample Data as a Feature group with online feature serving enabled\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "computing descriptive statistics for : online_featuregroup_test, version: 1\n",
      "computing feature correlation for: online_featuregroup_test, version: 1\n",
      "computing feature histograms for: online_featuregroup_test, version: 1\n",
      "computing cluster analysis for: online_featuregroup_test, version: 1\n",
      "Registering feature metadata...\n",
      "Registering feature metadata... [COMPLETE]\n",
      "Writing feature data to offline feature group (Hive)...\n",
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Writing feature data to offline feature group (Hive)... [COMPLETE]\n",
      "Writing feature data to online feature group (MySQL)...\n",
      "Writing feature data to online feature group (MySQL)... [COMPLETE]\n",
      "Feature group created successfully"
     ]
    }
   ],
   "source": [
    "featurestore.create_featuregroup(sample_df, \"online_featuregroup_test\", online=True, primary_key=\"id\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When creating a feature group, the spark dataframe is used to infer the data-schema and the feature types. The data schema is then used to create a Hive table (for offline data) and a MySQL table (for online data). If you want to have more control over the feature types for the MySQL table (e.g length of a varchar column) you can pass in the types in the optional argument `online_types`, which takes a dict of the form `feature_name --> feature_type`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "sample_df_types_test = sample_df.withColumnRenamed(\n",
    "    \"val_1\", \"val_1_type_test\").withColumnRenamed(\n",
    "    \"val_2\", \"val_2_type_test\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "computing descriptive statistics for : online_featuregroup_test_types, version: 1\n",
      "computing feature correlation for: online_featuregroup_test_types, version: 1\n",
      "computing feature histograms for: online_featuregroup_test_types, version: 1\n",
      "computing cluster analysis for: online_featuregroup_test_types, version: 1\n",
      "Registering feature metadata...\n",
      "Registering feature metadata... [COMPLETE]\n",
      "Writing feature data to offline feature group (Hive)...\n",
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Writing feature data to offline feature group (Hive)... [COMPLETE]\n",
      "Writing feature data to online feature group (MySQL)...\n",
      "Writing feature data to online feature group (MySQL)... [COMPLETE]\n",
      "Feature group created successfully"
     ]
    }
   ],
   "source": [
    "featurestore.create_featuregroup(sample_df_types_test, \"online_featuregroup_test_types\", \n",
    "                                 online=True, primary_key=\"id\", online_types = {\"val_1\": \"DECIMAL\"})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Read Features from Online Feature Store\n",
    "\n",
    "The same methods for reading the offline feature store can be used to read from the online feature store by setting the argument `online=True`. **However, NOTE**: as the online feature store is supposed to be used for feature serving, it should be queried with primary-key lookups for getting the best performance. In fact, it is highly discouraged to use the online feature serving for doing full-table-scans. If you find yourself frequently needing to use `get_featuregroup(online=True)` to get the entire feature group (full-table scan), you are probably better of using the offline feature store. The online feature store is intended for quick primary key lookups, not data analysis.\n",
    "\n",
    "To make the migration from the regular offline-featurestore API to the online-featurestore simple, for each example of reading from the online featurestore below, there is an accompanying example of reading from the offline feature store."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Free-Text SQL Query for the Online Feature Store\n",
    "\n",
    "Featuregroups are stored as tables with the naming `featuregroupname_version` as Hive tables for offline features, and MySQL tables for online features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Running sql: SELECT val_1 FROM online_featuregroup_test_1 WHERE id=999 against online feature store"
     ]
    }
   ],
   "source": [
    "#primary key lookup in MySQL\n",
    "df = featurestore.sql(\"SELECT val_1 FROM online_featuregroup_test_1 WHERE id=999\", online=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+\n",
      "|  val_1|\n",
      "+-------+\n",
      "|41251.5|\n",
      "+-------+"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Free-Text SQL Query for the Offline Feature Store"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Running sql: SELECT val_1 FROM online_featuregroup_test_1 WHERE id=999 against offline feature store"
     ]
    }
   ],
   "source": [
    "# primary key lookup in Hive\n",
    "df = featurestore.sql(\"SELECT val_1 FROM online_featuregroup_test_1 WHERE id=999\", online=False) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|   val_1|\n",
      "+--------+\n",
      "|41251.52|\n",
      "+--------+"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Online Version of Feature Group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT * FROM online_featuregroup_test_1 against online feature store"
     ]
    }
   ],
   "source": [
    "df = featurestore.get_featuregroup(\"online_featuregroup_test\", online=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-------+-----+\n",
      "| id|  val_1|val_2|\n",
      "+---+-------+-----+\n",
      "|997|21219.1|    2|\n",
      "|999|41251.5|    1|\n",
      "|998| 1319.4|    8|\n",
      "+---+-------+-----+"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Offline Version of Feature Group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT * FROM online_featuregroup_test_1 against offline feature store"
     ]
    }
   ],
   "source": [
    "df = featurestore.get_featuregroup(\"online_featuregroup_test\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------+-----+\n",
      "| id|   val_1|val_2|\n",
      "+---+--------+-----+\n",
      "|998|  1319.4|    8|\n",
      "|997| 21219.1|    2|\n",
      "|999|41251.52|    1|\n",
      "+---+--------+-----+"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Online Version of individual Feature"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Logical query plan for getting 1 feature from the featurestore created successfully\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT val_1 FROM online_featuregroup_test_1 against online feature store"
     ]
    }
   ],
   "source": [
    "df = featurestore.get_feature(\"val_1\", online=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+\n",
      "|  val_1|\n",
      "+-------+\n",
      "|21219.1|\n",
      "|41251.5|\n",
      "| 1319.4|\n",
      "+-------+"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Offline Version of individual Feature"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Logical query plan for getting 1 feature from the featurestore created successfully\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT val_1 FROM online_featuregroup_test_1 against offline feature store"
     ]
    }
   ],
   "source": [
    "df = featurestore.get_feature(\"val_1\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|   val_1|\n",
      "+--------+\n",
      "|  1319.4|\n",
      "| 21219.1|\n",
      "|41251.52|\n",
      "+--------+"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Online Version of a list of individual Features\n",
    "\n",
    "The featues can potentially span multiple feature groups, as long as all feature groups have online serving enabled, the feature store query planner will join the features on the fly."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Logical query plan for getting 2 features from the featurestore created successfully\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT val_1, val_2 FROM online_featuregroup_test_1 against online feature store"
     ]
    }
   ],
   "source": [
    "df = featurestore.get_features([\"val_1\", \"val_2\"], online=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----+\n",
      "|  val_1|val_2|\n",
      "+-------+-----+\n",
      "|21219.1|    2|\n",
      "|41251.5|    1|\n",
      "| 1319.4|    8|\n",
      "+-------+-----+"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Offline Version of a list of individual Features"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Logical query plan for getting 2 features from the featurestore created successfully\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT val_1, val_2 FROM online_featuregroup_test_1 against offline feature store"
     ]
    }
   ],
   "source": [
    "df = featurestore.get_features([\"val_1\", \"val_2\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+-----+\n",
      "|   val_1|val_2|\n",
      "+--------+-----+\n",
      "|  1319.4|    8|\n",
      "| 21219.1|    2|\n",
      "|41251.52|    1|\n",
      "+--------+-----+"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### List all feature groups that have online feature serving enabled"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['online_featuregroup_test_types_1', 'online_featuregroup_test_1']"
     ]
    }
   ],
   "source": [
    "featurestore.get_featuregroups(online=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### List all features that have online feature serving enabled"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['id', 'val_1_type_test', 'val_2_type_test', 'id', 'val_1', 'val_2']"
     ]
    }
   ],
   "source": [
    "featurestore.get_features_list(online=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Enable Online Feature Serving for a Feature Group that is Offline-Only\n",
    "\n",
    "By default when a feature group is created with `create_featuregroup()`, the feature group will not have online serving enabled, all data will be stored in the offline feature group (Hive). To create a feature group with online serving, pass the flag `online=True` to `create_featuregroup()` (an example is provided in the beginning of this notebook).\n",
    "\n",
    "If you want to enable online feature serving for a feature group dynamically, after the feature group have been created, you can use the API call `enable_featuregroup_online` (this will create a MySQL table in the backend). Conversely, if you want to disable online feature serving, use the API call `disable_featuregroup_online` (this will drop the MySQL table in the backend)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create Feature Group without Online Feature Serving Enabled"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SQLContext\n",
    "from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, IntegerType, FloatType\n",
    "sqlContext = SQLContext(sc)\n",
    "schema = StructType([StructField(\"id\", IntegerType(), True),\n",
    "                     StructField(\"test_col_1\", FloatType(), True),\n",
    "                     StructField(\"test_col_2\", IntegerType(), True)\n",
    "                        ])\n",
    "sample_df = sqlContext.createDataFrame([(999, 41251.52, 1), (998, 1319.4, 8), (997, 21219.1, 2)], schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "computing descriptive statistics for : enable_online_features_test, version: 1\n",
      "computing feature correlation for: enable_online_features_test, version: 1\n",
      "computing feature histograms for: enable_online_features_test, version: 1\n",
      "computing cluster analysis for: enable_online_features_test, version: 1\n",
      "Registering feature metadata...\n",
      "Registering feature metadata... [COMPLETE]\n",
      "Writing feature data to offline feature group (Hive)...\n",
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Writing feature data to offline feature group (Hive)... [COMPLETE]\n",
      "Feature group created successfully"
     ]
    }
   ],
   "source": [
    "featurestore.create_featuregroup(sample_df, \"enable_online_features_test\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Enable Online Feature Serving for Offline-Only feature group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT * FROM enable_online_features_test_1 against offline feature store\n",
      "Online Feature Serving enabled successfully for featuregroup: enable_online_features_test"
     ]
    }
   ],
   "source": [
    "featurestore.enable_featuregroup_online(\"enable_online_features_test\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### DIsable Online Feature Serving for a feature group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Online Feature Serving disabled successfully for featuregroup: enable_online_features_test"
     ]
    }
   ],
   "source": [
    "featurestore.disable_featuregroup_online(\"enable_online_features_test\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Insert into Offline/Online Feature Groups\n",
    "\n",
    "When inserting data into a feature group you can control whether the data should be written only to the offline feature group, only to the online feature group, or to both, using the parameters `online=True` and `offline=True`:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Generate Sample Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SQLContext\n",
    "from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, IntegerType, FloatType\n",
    "sqlContext = SQLContext(sc)\n",
    "schema = StructType([StructField(\"id\", IntegerType(), True),\n",
    "                     StructField(\"val_1\", FloatType(), True),\n",
    "                     StructField(\"val_2\", IntegerType(), True)\n",
    "                        ])\n",
    "sample_df = sqlContext.createDataFrame([(111, 01251.52, 1), (958, 1919.4, 8), (697, 41219.1, 1)], schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Insert into Online Feature Group Only"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT * FROM online_featuregroup_test_1 against offline feature store\n",
      "computing descriptive statistics for : online_featuregroup_test, version: 1\n",
      "computing feature correlation for: online_featuregroup_test, version: 1\n",
      "computing feature histograms for: online_featuregroup_test, version: 1\n",
      "computing cluster analysis for: online_featuregroup_test, version: 1\n",
      "Inserting data into online feature group online_featuregroup_test...\n",
      "Inserting data into online feature group online_featuregroup_test... [COMPLETE]\n",
      "Insertion into feature group was successful"
     ]
    }
   ],
   "source": [
    "featurestore.insert_into_featuregroup(sample_df, \"online_featuregroup_test\", \n",
    "                                      online=True, offline=False, mode=\"append\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Insert into Offline Feature Group Only"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT * FROM online_featuregroup_test_1 against offline feature store\n",
      "computing descriptive statistics for : online_featuregroup_test, version: 1\n",
      "computing feature correlation for: online_featuregroup_test, version: 1\n",
      "computing feature histograms for: online_featuregroup_test, version: 1\n",
      "computing cluster analysis for: online_featuregroup_test, version: 1\n",
      "Inserting data into offline feature group online_featuregroup_test...\n",
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Inserting data into offline feature group online_featuregroup_test... [COMPLETE]\n",
      "Insertion into feature group was successful"
     ]
    }
   ],
   "source": [
    "featurestore.insert_into_featuregroup(sample_df, \"online_featuregroup_test\", \n",
    "                                      online=False, offline=True, mode=\"append\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Insert into Online and Offline Feature Group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT * FROM online_featuregroup_test_1 against offline feature store\n",
      "computing descriptive statistics for : online_featuregroup_test, version: 1\n",
      "computing feature correlation for: online_featuregroup_test, version: 1\n",
      "computing feature histograms for: online_featuregroup_test, version: 1\n",
      "computing cluster analysis for: online_featuregroup_test, version: 1\n",
      "Inserting data into offline feature group online_featuregroup_test...\n",
      "Running sql: use demo_featurestore_admin000_featurestore against offline feature store\n",
      "Inserting data into offline feature group online_featuregroup_test... [COMPLETE]\n",
      "Inserting data into online feature group online_featuregroup_test...\n",
      "Inserting data into online feature group online_featuregroup_test... [COMPLETE]\n",
      "Insertion into feature group was successful"
     ]
    }
   ],
   "source": [
    "featurestore.insert_into_featuregroup(sample_df, \"online_featuregroup_test\", \n",
    "                                      online=True, offline=True, mode=\"overwrite\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}